---
id: message-passing
title: Workflow message passing - Python SDK
sidebar_label: Messages
description: Develop with Queries, Signals, and Updates with the Temporal Python SDK.
toc_max_heading_level: 3
keywords:
  - temporal python signals
  - send signal from client
  - send signal from workflow
  - signal with start
  - workflow queries
  - sending queries
  - workflow updates
  - dynamic workflows
  - dynamic activities
  - dynamic signals
  - dynamic queries
tags:
  - Workflows
  - Messages
  - Signals
  - Queries
  - Updates
  - Python SDK
  - Temporal SDKs
---

A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates.
The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values.
Temporal Clients use messages to read Workflow state and control its execution.
See [Workflow message passing](/encyclopedia/workflow-message-passing) for a general overview of this topic.
This page introduces these features for the Temporal Python SDK.

## Write message handlers {#writing-message-handlers}

:::info
The code that follows is part of a working message passing [sample](https://github.com/temporalio/samples-python/tree/main/message_passing/introduction).
:::

Follow these guidelines when writing your message handlers:

- Message handlers are defined as methods on the Workflow class, using one of the three decorators: [`@workflow.query`](https://python.temporal.io/temporalio.workflow.html#query), [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal), and [`@workflow.update`](https://python.temporal.io/temporalio.workflow.html#update).
- The parameters and return values of handlers and the main Workflow function must be [serializable](/dataconversion).
- Prefer [data classes](https://docs.python.org/3/library/dataclasses.html) to multiple input parameters.
  Data class parameters allow you to add fields without changing the calling signature.

### Query handlers {#queries}

A [Query](/encyclopedia/workflow-message-passing#sending-queries) is a synchronous operation that retrieves state from a Workflow Execution:

```python
class Language(IntEnum):
    Chinese = 1
    English = 2
    French = 3

@dataclass
class GetLanguagesInput:
    include_unsupported: bool

@workflow.defn
class GreetingWorkflow:
    def __init__(self) -> None:
        self.greetings = {
            Language.CHINESE: "你好，世界",
            Language.ENGLISH: "Hello, world",
        }

    @workflow.query
    def get_languages(self, input: GetLanguagesInput) -> list[Language]:
        # 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
        if input.include_unsupported:
            return list(Language)
        else:
            return list(self.greetings)
```

- The Query decorator can accept arguments.
  Refer to the API docs: [`@workflow.query`](https://python.temporal.io/temporalio.workflow.html#query).

- A Query handler uses `def`, not `async def`.
  You can't perform async operations like executing an Activity in a Query handler.

### Signal handlers {#signals}

A [Signal](/encyclopedia/workflow-message-passing#sending-signals) is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:

```python
@dataclass
class ApproveInput:
    name: str

@workflow.defn
class GreetingWorkflow:
    ...
    @workflow.signal
    def approve(self, input: ApproveInput) -> None:
        # 👉 A Signal handler mutates the Workflow state but cannot return a value.
        self.approved_for_release = True
        self.approver_name = input.name
```

- The Signal decorator can accept arguments.
  Refer to the API docs: [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal).

- The handler should not return a value.
  The response is sent immediately from the server, without waiting for the Workflow to process the Signal.

- Signal (and Update) handlers can be `async def`.
  This allows you to use Activities, Child Workflows, durable [`asyncio.sleep`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) Timers, [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions, and more.
  See [Async handlers](#async-handlers) and [Workflow message passing](/encyclopedia/workflow-message-passing) for guidelines on safely using async Signal and Update handlers.

### Update handlers and validators {#updates}

An [Update](/encyclopedia/workflow-message-passing#sending-updates) is a trackable synchronous request sent to a running Workflow Execution.
It can change the Workflow state, control its flow, and return a result.
The sender must wait until the Worker accepts or rejects the Update.
The sender may wait further to receive a returned value or an exception if something goes wrong:

```python
class Language(IntEnum):
    Chinese = 1
    English = 2
    French = 3

@workflow.defn
class GreetingWorkflow:
    ...
    @workflow.update
    def set_language(self, language: Language) -> Language:
        # 👉 An Update handler can mutate the Workflow state and return a value.
        previous_language, self.language = self.language, language
        return previous_language

    @set_language.validator
    def validate_language(self, language: Language) -> None:
        if language not in self.greetings:
            # 👉 In an Update validator you raise any exception to reject the Update.
            raise ValueError(f"{language.name} is not supported")
```

- The Update decorator can take arguments (like, `name`, `dynamic` and `unfinished_policy`) as described in the API reference docs for [`workflow.update`](https://python.temporal.io/temporalio.workflow.html#update).

- About validators:
  - Use validators to reject an Update before it is written to History.
    Validators are always optional.
    If you don't need to reject Updates, you can skip them.
  - The SDK automatically provides a validator decorator named `@<update-handler-name>.validator`.
    The validator must accept the same argument types as the handler and return `None`.

- Accepting and rejecting Updates with validators:
  - To reject an Update, raise an exception of any type in the validator.
  - Without a validator, Updates are always accepted.
- Validators and Event History:
  - The `WorkflowExecutionUpdateAccepted` event is written into the History whether the acceptance was automatic or programmatic.
  - When a Validator raises an error, the Update is rejected and `WorkflowExecutionUpdateAccepted` _won't_ be added to the Event History.
    The caller receives an "Update failed" error.

- Use [`workflow.current_update_info`](https://python.temporal.io/temporalio.workflow.html#current_update_info) to obtain information about the current Update.
  This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see [Ensuring your messages are processed exactly once](/encyclopedia/workflow-message-passing#exactly-once-message-processing).
- Update (and Signal) handlers can be `async def`, letting them use Activities, Child Workflows, durable [`asyncio.sleep`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) Timers, [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions, and more.
  See [Async handlers](#async-handlers) and [Workflow message passing](/encyclopedia/workflow-message-passing) for safe usage guidelines.

## Send messages {#send-messages}

To send Queries, Signals, or Updates, you call methods on a [WorkflowHandle](https://python.temporal.io/temporalio.client.WorkflowHandle.html) object:

- Use [start_workflow](https://python.temporal.io/temporalio.client.Client.html#start_workflow) to start a Workflow and return its handle.

- Use [get_workflow_handle_for](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle_for) to retrieve a typed Workflow handle by its Workflow Id.

For example:

```python
client = await Client.connect("localhost:7233")
workflow_handle = await client.start_workflow(
    GreetingWorkflow.run, id="greeting-workflow-1234", task_queue="my-task-queue"
)
```

To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.

:::warning Using Continue-as-New and Updates

- Temporal _does not_ support Continue-as-New functionality within Update handlers.
- Complete all handlers _before_ using Continue-as-New.
- Use Continue-as-New from your main Workflow Definition method, just as you would complete or fail a Workflow Execution.

:::

### Send a Query {#send-query}

Use [`WorkflowHandle.query`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#query) to send a Query to a Workflow Execution:

```python
supported_languages = await workflow_handle.query(
    GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True)
)
```

- Sending a Query doesn’t add events to a Workflow's Event History.

- You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period.
  This includes Workflows that have completed, failed, or timed out.
  Querying terminated Workflows is not safe and, therefore, not supported.

- A Worker must be online and polling the Task Queue to process a Query.

### Send a Signal {#send-signal}

You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution.
However, you can only send Signals to Workflow Executions that haven’t closed.

#### Send a Signal from a Client {#send-signal-from-client}

Use [`WorkflowHandle.signal`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal) to send a Signal:

```python
await workflow_handle.signal(GreetingWorkflow.approve, ApproveInput(name="me"))
```

- The call returns when the server accepts the Signal; it does _not_ wait for the Signal to be delivered to the Workflow Execution.

- The [WorkflowExecutionSignaled](/references/events#workflowexecutionsignaled) Event appears in the Workflow's Event History.

#### Send a Signal from a Workflow {#send-signal-from-workflow}

A Workflow can send a Signal to another Workflow, known as an _External Signal_.
You'll need a Workflow handle for the external Workflow.
Use [`get_external_workflow_handle_for`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle_for):

<div style={{backgroundColor: '#e0e0e0',padding: '0px 15px',borderRadius: '5px',border: '1px solid #cccccc',display: 'inline-block'}}><a href="https://github.com/temporalio/documentation/blob/main/sample-apps/python/signal_your_workflow/signal_external_wf_dacx.py"><tt>See full sample</tt></a></div>

```python
# ...
@workflow.defn
class WorkflowB:
    @workflow.run
    async def run(self) -> None:
        handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
        await handle.signal(WorkflowA.your_signal, "signal argument")
```

When an External Signal is sent:

- A [SignalExternalWorkflowExecutionInitiated](/references/events#signalexternalworkflowexecutioninitiated) Event appears in the sender's Event History.
- A [WorkflowExecutionSignaled](/references/events#workflowexecutionsignaled) Event appears in the recipient's Event History.

#### Signal-With-Start {#signal-with-start}

Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running.
To use Signal-With-Start, call the [`start_workflow`](https://python.temporal.io/temporalio.client.Client.html#start_workflow) method and pass the `start_signal` argument with the name of your Signal:

<div style={{backgroundColor: '#e0e0e0',padding: '0px 15px',borderRadius: '5px',border: '1px solid #cccccc',display: 'inline-block'}}><a href="https://github.com/temporalio/documentation/blob/main/sample-apps/python/signal_your_workflow/signal_with_start_dacx.py"><tt>See full sample</tt></a></div>

```python
from temporalio.client import Client
# ...
async def main():
    client = await Client.connect("localhost:7233")
    await client.start_workflow(
        GreetingWorkflow.run,
        id="your-signal-with-start-workflow",
        task_queue="signal-tq",
        start_signal="submit_greeting",
        start_signal_args=["User Signal with Start"],
    )
```

### Send an Update {#send-update-from-client}

An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.

A client sending an Update must wait until the Server delivers the Update to a Worker.
Workers must be available and responsive.
If you need a response as soon as the Server receives the request, use a Signal instead.
Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.

- `WorkflowExecutionUpdateAccepted` is added to the Event History when the Worker confirms that the Update passed validation.
- `WorkflowExecutionUpdateCompleted` is added to the Event History when the Worker confirms that the Update has finished.

To send an Update to a Workflow Execution, you can:

- Call [`execute_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update) and wait for the Update to complete.
  This code fetches an Update result:

  ```python
  previous_language = await workflow_handle.execute_update(
      GreetingWorkflow.set_language, Language.Chinese
  )
  ```

- Send [`start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update) to receive an [`UpdateHandle`](https://python.temporal.io/temporalio.client.WorkflowUpdateHandle.html) as soon as the Update is accepted or rejected.

  - Use this `UpdateHandle` later to fetch your results.
  - `async def` Update handlers normally perform long-running asynchronous operations, such as executing an Activity.
  - `start_update` only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.

  For example:

  ```python
  # Wait until the update is accepted
  update_handle = await workflow_handle.start_update(
      HelloWorldWorkflow.set_greeting,
      HelloWorldInput("World"),
      wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
  )
  # Wait until the update is completed
  update_result = await update_handle.result()
  ```

  For more details, see the "Async handlers" section.

To obtain an Update handle, you can:

- Use [`start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update) to start an Update and return the handle, as shown in the preceding example.
- Use [`get_update_handle_for`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#get_update_handle_for) to fetch a handle for an in-progress Update using the Update ID.

:::info SEND MESSAGES WITHOUT TYPE SAFETY

In real-world development, sometimes you may be unable to import Workflow Definition method signatures.
When you don't have access to the Workflow Definition or it isn't written in Python, you can still use APIs that aren't type-safe, and dynamic method invocation.
Pass method names instead of method objects to:

- [`Client.start_workflow`](https://python.temporal.io/temporalio.client.Client.html#start_workflow)
- [`WorkflowHandle.query`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#query)
- [`WorkflowHandle.signal`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#signal)
- [`WorkflowHandle.execute_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#execute_update)
- [`WorkflowHandle.start_update`](https://python.temporal.io/temporalio.client.WorkflowHandle.html#start_update)

Use these non-type safe APIs:

- [`get_workflow_handle`](https://python.temporal.io/temporalio.client.Client.html#get_workflow_handle)
- [`get_external_workflow_handle`](https://python.temporal.io/temporalio.workflow.html#get_external_workflow_handle).

:::

## Message handler patterns {#message-handler-patterns}

This section covers common write operations, such as Signal and Update handlers.
It doesn't apply to pure read operations, like Queries or Update Validators.

:::tip

For additional information, see [Inject work into the main Workflow](/encyclopedia/workflow-message-passing#injecting-work-into-main-workflow), [Ensuring your messages are processed exactly once](/encyclopedia/workflow-message-passing#exactly-once-message-processing), and [this sample](https://github.com/temporalio/samples-python/blob/message-passing/message_passing/safe_message_handlers/README.md) demonstrating safe `async` message handling.

:::

### Use async handlers {#async-handlers}

Signal and Update handlers can be `async def` as well as `def`.
Using `async def` allows you to use `await` with Activities, Child Workflows, [`asyncio.sleep`](https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep) Timers, [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) conditions, etc.
This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at `await` calls.
It's essential to understand the things that could go wrong in order to use `async def` handlers safely.
See [Workflow message passing](/encyclopedia/workflow-message-passing) for guidance on safe usage of async Signal and Update handlers, the [Safe message handlers](https://github.com/temporalio/samples-python/tree/main/updates_and_signals/safe_message_handlers) sample, and the [Controlling handler concurrency](#control-handler-concurrency) and [Waiting for message handlers to finish](#wait-for-message-handlers) sections below.

The following code executes an Activity that makes a network call to a remote service.
It modifies the Update handler from earlier on this page, turning it into an `async def`:

```python
@activity.defn
async def call_greeting_service(to_language: Language) -> Optional[str]:
    await asyncio.sleep(0.2)  # Pretend that we are calling a remote service.
    greetings = {
        Language.Arabic: "مرحبا بالعالم",
        Language.Chinese: "你好，世界",
        Language.English: "Hello, world",
        Language.French: "Bonjour, monde",
        Language.Hindi: "नमस्ते दुनिया",
        Language.Spanish: "Hola mundo",
    }
    return greetings.get(to_language)


@workflow.defn
class GreetingWorkflow:
    def __init__(self) -> None:
        self.lock = asyncio.Lock()
        ...
    ...
    @workflow.update
    async def set_language(self, language: Language) -> Language:
        if language not in self.greetings:
            # 👉 Use a lock here to ensure that multiple calls to set_language are processed in order.
            async with self.lock:
                greeting = await workflow.execute_activity(
                    call_greeting_service,
                    language,
                    start_to_close_timeout=timedelta(seconds=10),
                )
                if greeting is None:
                    # 👉 An update validator cannot be async, so cannot be used to check that the remote
                    # call_greeting_service supports the requested language. Raising ApplicationError
                    # will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
                    # added to history.
                    raise ApplicationError(
                        f"Greeting service does not support {language.name}"
                    )
                self.greetings[language] = greeting
        previous_language, self.language = self.language, language
        return previous_language
```

After updating the code to use an `async def`, your Update handler can schedule an Activity and await the result.
Although an `async def` Signal handler can also execute an Activity, using an Update handler allows the Client to receive a result or error once the Activity completes.
This lets your client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.

### Add wait conditions to block

Sometimes, `async def` Signal or Update handlers need to meet certain conditions before they should continue.
You can use [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) to prevent the code from proceeding until a condition is true.
You specify the condition by passing a function that returns `True` or `False`, and you can optionally set a timeout.
This is an important feature that helps you control your handler logic.

Here are three important use cases for `workflow.wait_condition`:

- Wait for a Signal or Update to arrive.
- Wait in a handler until it's appropriate to continue.
- Wait in the main Workflow until all active handlers have finished.

#### Wait for a Signal or Update to arrive

It's common to use `workflow.condition` to wait for a particular Signal or Update to be sent by a Client:

```python
@workflow.defn
class GreetingWorkflow:
    def __init__(self) -> None:
        self.approved_for_release = False
        self.approver_name: Optional[str] = None

    @workflow.signal
    def approve(self, input: ApproveInput) -> None:
        self.approved_for_release = True
        self.approver_name = input.name

    @workflow.run
    async def run(self) -> str:
        await workflow.wait_condition(lambda: self.approved_for_release)
        ...
        return self.greetings[self.language]
```

#### Use wait conditions in handlers {#wait-in-message-handler}

It's common to use a Workflow wait condition to wait until a handler should start.
You can also use wait conditions anywhere else in the handler to wait for a specific condition to become `True`.
This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become `True`.

Consider a `ready_for_update_to_execute` method that runs before your Update handler executes.
The `workflow.wait_condition` method waits until your condition is met:

```python
@workflow.update
async def my_update(self, update_input: UpdateInput) -> str:
    await workflow.wait_condition(
        lambda: self.ready_for_update_to_execute(update_input)
    )
```

You can also use wait conditions anywhere else in the handler to wait for a specific condition to become true.
This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.

#### Ensure your handlers finish before the Workflow completes {#wait-for-message-handlers}

Workflow wait conditions can ensure your handler completes before a Workflow finishes.
When your Workflow uses `async def` Signal or Update handlers, your main Workflow method can return or continue-as-new while a handler is still waiting on an async task, such as an Activity result.
The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying retrieve Update results.
Use [`workflow.wait_condition`](https://python.temporal.io/temporalio.workflow.html#wait_condition) and [`all_handlers_finished`](https://python.temporal.io/temporalio.workflow.html#all_handlers_finished) to address this problem and allow your Workflow to end smoothly:

```python
@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> str:
        ...
        await workflow.wait_condition(workflow.all_handlers_finished)
        return "workflow-result"
```

By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions.
You can silence these warnings on a per-handler basis by passing the `unfinished_policy` argument to the [`@workflow.signal`](https://python.temporal.io/temporalio.workflow.html#signal) / [`workflow.update`](https://python.temporal.io/temporalio.workflow.html#update) decorator:

```python
@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)
async def my_update(self) -> None:
    ...
```

See [Finishing handlers before the Workflow completes](/encyclopedia/workflow-message-passing#finishing-message-handlers) for more information.

### Use `@workflow.init` to operate on Workflow input before any handler executes

Normally, your Workflow `__init__` method won't have any parameters.
However, if you use the `@workflow.init` decorator on your `__init__` method, you can give it the same [Workflow parameters](/develop/python/core-application#workflow-parameters) as your `@workflow.run` method.
The SDK will then ensure that your `__init__` method receives the Workflow input arguments that the [Client sent](/develop/python/temporal-clients#start-workflow-execution).
(The Workflow input arguments are also passed to your `@workflow.run` method -- that always happens, whether or not you use the `@workflow.init` decorator.)
This is useful if you have message handlers that need access to workflow input: see [Initializing the Workflow first](/encyclopedia/workflow-message-passing#workflow-initializers).

Here's an example.
Notice that `__init__` and `get_greeting` must have the same parameters, with the same type annotations:

```python
@dataclass
class MyWorkflowInput:
    name: str


@workflow.defn
class WorkflowRunSeesWorkflowInitWorkflow:
    @workflow.init
    def __init__(self, workflow_input: MyWorkflowInput) -> None:
        self.name_with_title = f"Sir {workflow_input.name}"
        self.title_has_been_checked = False

    @workflow.run
    async def get_greeting(self, workflow_input: MyWorkflowInput) -> str:
        await workflow.wait_condition(lambda: self.title_has_been_checked)
        return f"Hello, {self.name_with_title}"

    @workflow.update
    async def check_title_validity(self) -> bool:
        # 👉 The handler is now guaranteed to see the workflow input
        # after it has been processed by __init__.
        is_valid = await workflow.execute_activity(
            check_title_validity,
            self.name_with_title,
            schedule_to_close_timeout=timedelta(seconds=10),
        )
        self.title_has_been_checked = True
        return is_valid
```

### Use `asyncio.Lock` to prevent concurrent handler execution {#control-handler-concurrency}

Concurrent processes can interact in unpredictable ways.
Incorrectly written [concurrent message-passing](/encyclopedia/workflow-message-passing#message-handler-concurrency) code may not work correctly when multiple handler instances run simultaneously.
Here's an example of a pathological case:

```python
@workflow.defn
class MyWorkflow:

    @workflow.signal
    async def bad_async_handler(self):
        data = await workflow.execute_activity(
            fetch_data, start_to_close_timeout=timedelta(seconds=10)
        )
        self.x = data.x
        # 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
        # there may be times when the Workflow has self.x from one Activity execution and self.y from another.
        await asyncio.sleep(1)  # or await anything else
        self.y = data.y
```

Coordinating access using `asyncio.Lock` corrects this code.
Locking makes sure that only one handler instance can execute a specific section of code at any given time:

```python
@workflow.defn
class MyWorkflow:
    def __init__(self) -> None:
        ...
        self.lock = asyncio.Lock()
        ...

    @workflow.signal
    async def safe_async_handler(self):
        async with self.lock:
            data = await workflow.execute_activity(
                fetch_data, start_to_close_timeout=timedelta(seconds=10)
            )
            self.x = data.x
            # ✅ OK: the scheduler may switch now to a different handler execution, or to the main workflow
            # method, but no other execution of this handler can run until this execution finishes.
            await asyncio.sleep(1)  # or await anything else
            self.y = data.y
```

## Message handler troubleshooting {#message-handler-troubleshooting}

When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:

- **The client can't contact the server**:
  You'll receive a [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) on which the `status` attribute is [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `UNAVAILABLE` (after some retries; see the `retry_config` argument to [`Client.connect`](https://python.temporal.io/temporalio.client.Client.html#connect)).

- **The workflow does not exist**:
  You'll receive an [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception on which the `status` attribute is [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `NOT_FOUND`.

See [Exceptions in message handlers](/encyclopedia/workflow-message-passing#exceptions) for a non–Python-specific discussion of this topic.

### Problems when sending a Signal {#signal-problems}

When using Signal, the only exceptions that will result from your requests during its execution are the `RPCError`s described above.

For Queries and Updates, the Client waits for a response from the Worker, and therefore additional errors may occur during the handler Execution by the Worker.

### Problems when sending an Update {#update-problems}

When working with Updates, in addition to the `RPCError`s described above, you may encounter these errors:

- **No Workflow Workers are polling the Task Queue**:
  Your request will be retried by the SDK Client indefinitely.
  You can use [`asyncio.timeout`](https://docs.python.org/3/library/asyncio-task.html#timeouts) to impose a timeout.
  This raises a [`temporalio.client.WorkflowUpdateRPCTimeoutOrCancelledError`](https://python.temporal.io/temporalio.client.WorkflowUpdateRPCTimeoutOrCancelledError.html) exception.

- **Update failed**: You'll receive a [`temporalio.client.WorkflowUpdateFailedError`](https://python.temporal.io/temporalio.client.WorkflowUpdateFailedError.html) exception.
  There are two ways this can happen:

  - The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.

  - The Update failed after having been accepted.

  Update failures are like [Workflow failures](/references/failures#errors-in-workflows).
  Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler.
  These might include:

      - A failed Child Workflow
      - A failed Activity (if the Activity retries have been set to a finite number)
      - The Workflow author raising [`ApplicationError`](/references/failures#application-failure)
      - Any error listed in [workflow_failure_exception_types](https://python.temporal.io/temporalio.worker.Worker.html) (empty by default)

- **The handler caused the Workflow Task to fail**:
  A [Workflow Task Failure](/references/failures#errors-in-workflows) causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:
  - If the request hasn't been accepted by the server, you receive a `FAILED_PRECONDITION` [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception.
  - If the request has been accepted, it is durable.
    Once the Workflow is healthy again after a code deploy, use an [`UpdateHandle`](https://python.temporal.io/temporalio.client.WorkflowUpdateHandle.html) to fetch the Update result.

- **The Workflow finished while the Update handler execution was in progress**:
  You'll receive a [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception with a `status` attribute of [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `NOT_FOUND`.
  This happens if the Workflow finished while the Update handler execution was in progress, for example because

  - The Workflow was canceled or failed.

  - The Workflow completed normally or continued-as-new and the Workflow author did not [wait for handlers to be finished](/encyclopedia/workflow-message-passing#finishing-message-handlers).

### Problems when sending a Query {#query-problems}

When working with Queries, in addition to the `RPCError`s described above, you may encounter these errors:

- **There is no Workflow Worker polling the Task Queue**:
  You'll receive a [`temporalio.service.RPCError`](https://python.temporal.io/temporalio.service.RPCError.html) exception on which the `status` attribute is [`RPCStatusCode`](https://python.temporal.io/temporalio.service.RPCStatusCode.html) `FAILED_PRECONDITION`.

- **Query failed**:
  You'll receive a [`temporalio.client.WorkflowQueryFailedError`](https://python.temporal.io/temporalio.client.WorkflowQueryFailedError.html) exception if something goes wrong during a Query.
  Any exception in a Query handler will trigger this error.
  This differs from Signal and Update requests, where exceptions can lead to Workflow Task Failure instead.

- **The handler caused the Workflow Task to fail.**
  This would happen, for example, if the Query handler blocks the thread for too long without yielding.

## Dynamic components {#dynamic-handler}

A dynamic Workflow, Activity, Signal, Update, or Query is a kind of unnamed item.
Normally, these items are registered by name with the Worker and invoked at runtime.
When an unregistered or unrecognized Workflow, Activity, or message request arrives with a recognized method signature, the Worker can use a pre-registered dynamic stand-in.

For example, you might send a request to start a Workflow named "MyUnknownWorkflow".
After receiving a Workflow Task, the Worker may find that there's no registered Workflow Definitions of that type.
It then checks to see if there's a registered dynamic Workflow.
If the dynamic Workflow signature matches the incoming Workflow signature, the Worker invokes that just as it would invoke a non-dynamic statically named version.

By registering dynamic versions of your Temporal components, the Worker can fall back to these alternate implementations for name mismatches.

:::caution

Use dynamic elements judiciously and as a fallback mechanism, not a primary design.
They can introduce long-term maintainability and debugging issues.
Reserve dynamic invocation use for cases where a name is not or can't be known at compile time.

:::

### Set a dynamic Signal, Query, or Update handler {#set-a-dynamic-signal}

A dynamic Signal, Query, or Update refers to a special stand-in handler.
It's used when an unregistered handler request arrives.

Consider a Signal, where you might send something like `workflow.signal(MyWorkflow.my_signal_method, my_arg)`.
This is a type-safe compiler-checked approach that guarantees a method exists.
There's also a non-type-safe string-based form: `workflow.signal('some-name', my_arg)`.
When sent to the server, the name is checked only after arriving at the Worker.
This is where "dynamic handlers" come in.

After failing to find a handler with a matching name and type, the Worker checks for a registered dynamic stand-in handler.
If found, the Worker uses that instead.

You must opt handlers into dynamic access.
Add `dynamic=True` to the handler decorator (for example, `@workflow.signal(dynamic=True)`) to make a handler dynamic.
The handler's signature must accept `(self, name: str, args: Sequence[RawValue])`.
Use a [payload_converter](https://python.temporal.io/temporalio.workflow.html#payload_converter) function to convert `RawValue` objects to your required type.
For example:

```python
from typing import Sequence

from temporalio.common import RawValue
...

    @workflow.signal(dynamic=True)
    async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
        ...
```

This sample creates a `dynamic_signal` Signal.
When an unregistered or unrecognized Signal arrives with a matching signature, dynamic assignment uses this handler to manage the Signal.
It is responsible for transforming the sequence contents into usable data in a form that the method's logic can process and act on.

### Set a dynamic Workflow {#set-a-dynamic-workflow}

A dynamic Workflow refers to a special stand-in Workflow Definition.
It's used when an unknown Workflow Execution request arrives.

Consider the "MyUnknownWorkflow" example described earlier.
The Worker may find there's no registered Workflow Definitions of that name or type.
After failing to find a Workflow Definition with a matching type, the Worker looks for a dynamic stand-in.
If found, it invokes that instead.

To participate, your Workflow must opt into dynamic access.
Adding `dynamic=True` to the `@workflow.defn` decorator makes the Workflow Definition eligible to participate in dynamic invocation.
You must register the Workflow with the [Worker](https://python.temporal.io/temporalio.worker.html) before it can be invoked.

The Workflow Definition's primary Workflow method must accept a single argument of type `Sequence[temporalio.common.RawValue]`.
Use a [payload_converter](https://python.temporal.io/temporalio.workflow.html#payload_converter) function to convert `RawValue` objects to your required type.
For example:

<div style={{backgroundColor: '#e0e0e0',padding: '0px 15px',borderRadius: '5px',border: '1px solid #cccccc',display: 'inline-block'}}><a href="https://github.com/temporalio/documentation/blob/main/sample-apps/python/dynamic_handlers/your_dynamic_workflow_dacx.py"><tt>See full sample</tt></a></div>

```python
# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
    @workflow.run
    async def run(self, args: Sequence[RawValue]) -> str:
        name = workflow.payload_converter().from_payload(args[0].payload, str)
        return await workflow.execute_activity(
            default_greeting,
            YourDataClass("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )
```

This Workflow converts the first `Sequence` element to a string, and uses that to execute an Activity.

### Set a dynamic Activity {#set-a-dynamic-activity}

A dynamic Activity is a stand-in implementation.
It's used when an Activity Task with an unknown Activity type is received by the Worker.

To participate, your Activity must opt into dynamic access.
Adding `dynamic=True` to the `@activity.defn` decorator makes the Workflow Definition eligible to participate in dynamic invocation.
You must register the Activity with the [Worker](https://python.temporal.io/temporalio.worker.html) before it can be invoked.

The Activity Definition must then accept a single argument of type `Sequence[temporalio.common.RawValue]`.
Use a [payload_converter](https://python.temporal.io/temporalio.activity.html#payload_converter) function to convert `RawValue` objects to your required types.
For example:

<div style={{backgroundColor: '#e0e0e0',padding: '0px 15px',borderRadius: '5px',border: '1px solid #cccccc',display: 'inline-block'}}><a href="https://github.com/temporalio/documentation/blob/main/sample-apps/python/dynamic_handlers/your_dynamic_activity_dacx.py"><tt>See full sample</tt></a></div>

```python
# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
    arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
    return (
        f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
    )
# ...
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            "unregistered_activity",
            YourDataClass("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )
```

This example invokes an unregistered Activity by name.
The Worker resolves it using the registered dynamic Activity instead.
When possible, prefer to use compiler-checked type-safe arguments rather than Activity name strings.
